package com.zjl.SpringBoot.第21章_流式_响应式_编程.B_Reactive响应式基础;

import com.zjl.util.OutColour;
import org.junit.jupiter.api.Test;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * ClassName: C_FlowDemo
 * Package: com.zjl.SpringBoot.第21章_流式_响应式_编程.B_Reactive响应式基础
 * Description: 基于 JDK9 的源码响应式
 * @Author 张蛟龙
 * @Create 2025/3/11 21:14
 * @Version 1.0
 */
public class C_FlowDemo {
    @Test
    public void 发布者和订阅者() {

        /**
         * 发布者:Publisher
         * 订阅者:Subscriber
         * 订阅关系:Subscription
         *
         */
        //1.定义发布者  Flow.Publisher  观察者模式
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();


        //2.定义订阅者  Flow.Subscriber
        Flow.Subscriber<String> subscriber = getStringSubscriber("001>>");
        Flow.Subscriber<String> subscriber1 = getStringSubscriber("002>>");


        //3.发布者 绑定  订阅者  （发布者订阅者模式，不会存在使用完就消失）
        publisher.subscribe(subscriber);//会告诉发布者，我准备好接收数据了
        for (int i = 0; i < 5; i++) {
            publisher.submit("发布者发布数据 ：" + i);
            if (i == 2) {
                publisher.subscribe(subscriber1);//会告诉发布者，我准备好接收数据了
            }
            //将发布的数据 放入  buffer区中
        }

        System.out.println("------------------------------------------");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        publisher.close();//发布者关闭

    }
    @Test
    public void 发布者和中间商和订阅者() {

        /**
         * 发布者:Publisher
         * 订阅者:Subscriber
         * 即发布者又订阅者:MyProcessor
         * 订阅关系:Subscription
         *
         */
        //1.定义发布者  Flow.Publisher  观察者模式
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        MyProcessor myProcessor = new MyProcessor();//中间商
        publisher.subscribe(myProcessor);//中间商订阅 第一个 发布者

        //2.定义订阅者  Flow.Subscriber
        Flow.Subscriber<String> subscriber = getStringSubscriber("001>>");
        Flow.Subscriber<String> subscriber1 = getStringSubscriber("002>>");


        //3.发布者 绑定  订阅者  （发布者订阅者模式，不会存在使用完就消失）
        myProcessor.subscribe(subscriber);//会告诉发布者，我准备好接收数据了
        for (int i = 0; i < 5; i++) {
            publisher.submit("发布者发布数据 ：" + i);
            if (i == 2) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                myProcessor.subscribe(subscriber1);//会告诉发布者，我准备好接收数据了
            }
            //将发布的数据 放入  buffer区中
        }

        System.out.println("------------------------------------------");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        publisher.close();//发布者关闭
        myProcessor.close();//发布者和订阅者  关闭

    }

    class  MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String,String>{


        private Flow.Subscription subscription;

        private String name = "中间商>>";

        @Override//在订阅时onXxx: 在xxx事件发生时，执行这个回调
        public void onSubscribe(Flow.Subscription subscription) {
            OutColour.out.printlnYellow(name,"Processor 订阅绑定成功：", subscription);
            this.subscription = subscription;
            this.subscription.request(1);//获取n个元素
        }

        @Override//在下一个元素到达时，执行这个回调，接受到新的数据
        public void onNext(String item) {
            OutColour.out.printlnBlue(name,"Processor，接受到数据：", item);
            item += "__加工后";
            submit(item);//加工后发出去
            this.subscription.request(1);//获取n个元素
        }

        @Override//在错误发生时执行
        public void onError(Throwable throwable) {
            OutColour.out.printlnRed( name,"订阅者，接受到错误信号：", throwable);
        }

        @Override//在完成时
        public void onComplete() {
            OutColour.out.printlnGreen( name,"Processor，完成了信号！！");
        }
    }

    //获取订阅者
    private static Flow.Subscriber<String> getStringSubscriber(String name) {
        Flow.Subscriber<String> subscriber1 = new Flow.Subscriber<>() {
            private Flow.Subscription subscription;

            @Override//在订阅时onXxx: 在xxx事件发生时，执行这个回调
            public void onSubscribe(Flow.Subscription subscription) {
                OutColour.out.printlnYellow(name, "订阅开始了：", subscription);
                this.subscription = subscription;
                this.subscription.request(1);//获取n个元素
            }

            @Override//在下一个元素到达时，执行这个回调，接受到新的数据
            public void onNext(String item) {
                OutColour.out.printlnBlue(name, "订阅者，接受到数据：", item);
                this.subscription.request(1);//获取n个元素
            }

            @Override//在错误发生时执行
            public void onError(Throwable throwable) {
                OutColour.out.printlnRed(name, "订阅者，接受到错误信号：", throwable);
            }

            @Override//在完成时
            public void onComplete() {
                OutColour.out.printlnGreen(name, "订阅者，完成了信号！！");
            }
        };
        return subscriber1;
    }

}
